KAFKA-20724: prevent duplicate task in tasks() during pause and resume#22649
Open
zoro30102000 wants to merge 1 commit into
Open
KAFKA-20724: prevent duplicate task in tasks() during pause and resume#22649zoro30102000 wants to merge 1 commit into
zoro30102000 wants to merge 1 commit into
Conversation
9917262 to
847adf8
Compare
lucasbru
reviewed
Jun 27, 2026
| private final Queue<StreamTask> restoredActiveTasks = new LinkedList<>(); | ||
| private final Lock restoredActiveTasksLock = new ReentrantLock(); | ||
| // visible for testing | ||
| final Lock restoredActiveTasksLock = new ReentrantLock(); |
Member
There was a problem hiding this comment.
We are using the same lock in other places without making it visible for testing.
How are we testing the other uses of the lock. Can we make it consistent?
Member
|
Fix looks good to me, one Q about the testing approach |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
JIRA: https://issues.apache.org/jira/browse/KAFKA-20724
DefaultStateUpdater.tasks() can return the same TaskId twice. The snapshot is taken under
executeWithQueuesLocked, which holds tasksAndActionsLock, restoredActiveTasksLock and
exceptionsAndFailedTasksLock, but not updatingTasks or pausedTasks. pauseTask does pausedTasks.put(id)
then updatingTasks.remove(id), and resumeTask does the reverse, so for a short window the task sits in
both maps. streamOfTasks() reads both, and ReadOnlyTask has no equals/hashCode, so the returned Set keeps
both wrappers. A caller that does one remove per entry then removes the same task twice. The first remove
succeeds; the second reaches removeTask, finds the task in none of the collections, and completes the
future with null, so waitForFuture turns it into "Task X was not found in the state updater. This
indicates a bug." The same duplicate also breaks TaskManager.allTasks(), which uses Collectors.toMap, with
"Duplicate key".
Fix: take restoredActiveTasksLock (the same lock tasks() already holds) around the two map writes in
pauseTask and resumeTask, so a reader never observes the task in both maps. The changelogReader
transitions stay outside the lock.
Tests: two tests in DefaultStateUpdaterTest hold restoredActiveTasksLock and assert that the pause and
resume transitions block on it. They fail by timeout if the lock is dropped from either method.
restoredActiveTasksLock was relaxed to package private so the tests can take it.
Noticed while here, not fixed in this PR: the standalone updatingTasks() and pausedTasks() accessors
snapshot without restoredActiveTasksLock, so a caller that composes both itself could still see an
inconsistent pair. There is no such caller today (pausedTasks() is test only), so I left it out. Happy to
fold it in here or track it separately.
Reviewers: Lucas Brutschy lbrutschy@confluent.io